package com.lg.delay.queue.service.impl;

import com.lg.delay.queue.common.exception.BusinessException;
import com.lg.delay.queue.constant.ErrorMessageEnum;
import com.lg.delay.queue.constant.RedisQueueKey;
import com.lg.delay.queue.pojo.Job;
import com.lg.delay.queue.pojo.JobDie;
import com.lg.delay.queue.service.RedisDelayQueueService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * @author by Mr. Li 2020/12/9 16:54
 */
@Service
@Slf4j
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加job 到redis中
     *
     * @param job 元信息
     */
    @Override
    public void addJob(Job job) {
        // 获取添加锁
        RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId());
        try {
            boolean b = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!b) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
            // 将 job 添加到JobPool中
            RMap<Object, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            if (jobPool.get(topicId) != null) {
                throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
            }
            jobPool.put(topicId, job);

            // 将job添加到 DelayBucket中
            RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.add(job.getDelay(), topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }

    /**
     * 删除 job
     *
     * @param jobDie job
     */
    @Override
    public void deleteJob(JobDie jobDie) {
        // 获取删除锁
        RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId());
        try {
            boolean b = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!b) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());

            RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            jobPool.remove(topicId);

            RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.remove(topicId);

        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}
